Steps:
load text fragment mechanism (text based: mech and smiles)
create fragments and fragment reactions (from smiles, check isomorphic duplicate, add reaction_repr for fragment reaction)
get thermo and kinetics
Input:
Output:
IMPORTANT: USE RMG-Py frag_kinetics_gen_new branch, RMG-dabase frag_kinetics_gen_new branch, rmg_env
In [ ]:
import os
from tqdm import tqdm
In [ ]:
from rmgpy import settings
from rmgpy.data.rmg import RMGDatabase
from rmgpy.kinetics import KineticsData
from rmgpy.rmg.model import getFamilyLibraryObject
from rmgpy.data.kinetics.family import TemplateReaction
from rmgpy.data.kinetics.depository import DepositoryReaction
from rmgpy.data.kinetics.common import find_degenerate_reactions
from rmgpy.chemkin import saveChemkinFile, saveSpeciesDictionary
In [ ]:
import afm
import afm.fragment
import afm.reaction
In [ ]:
def read_frag_mech(frag_mech_path):
reaction_string_dict = {}
current_family = ''
with open(frag_mech_path) as f_in:
for line in f_in:
if line.startswith('#') and ':' in line:
_, current_family = [token.strip() for token in line.split(':')]
elif line.strip() and not line.startswith('#'):
reaction_string = line.strip()
if current_family not in reaction_string_dict:
reaction_string_dict[current_family] = [reaction_string]
else:
reaction_string_dict[current_family].append(reaction_string)
return reaction_string_dict
def parse_reaction_string(reaction_string):
reactant_side, product_side = [token.strip() for token in reaction_string.split('==')]
reactant_strings = [token.strip() for token in reactant_side.split('+')]
product_strings = [token.strip() for token in product_side.split('+')]
return reactant_strings, product_strings
In [ ]:
job_name = 'two-sided_newcut1'
afm_base = os.path.dirname(afm.__path__[0])
working_dir = os.path.join(afm_base, 'examples', 'pdd_chemistry', job_name)
In [ ]:
# load RMG database to create reactions
database = RMGDatabase()
In [ ]:
database.load(
path = settings['database.directory'],
thermoLibraries = ['primaryThermoLibrary'], # can add others if necessary
kineticsFamilies = 'all',
reactionLibraries = [],
kineticsDepositories = ''
)
thermodb = database.thermo
# Add training reactions
for family in database.kinetics.families.values():
family.addKineticsRulesFromTrainingSet(thermoDatabase=thermodb)
# average up all the kinetics rules
for family in database.kinetics.families.values():
family.fillKineticsRulesByAveragingUp()
In [ ]:
# load fragment from smiles-like string
fragment_smiles_filepath = os.path.join(working_dir, 'fragment_smiles.txt')
fragments = []
with open(fragment_smiles_filepath) as f_in:
for line in f_in:
if line.strip() and not line.startswith('#') and ':' in line:
label, smiles = [token.strip() for token in line.split(":")]
frag = afm.fragment.Fragment(label=label).from_SMILES_like_string(smiles)
frag.assign_representative_species()
frag.species_repr.label = label
for prev_frag in fragments:
if frag.isIsomorphic(prev_frag):
raise Exception('Isomorphic duplicate found: {0} and {1}'.format(label, prev_frag.label))
fragments.append(frag)
# construct label-key fragment dictionary
fragment_dict = {}
for frag0 in fragments:
if frag0.label not in fragment_dict:
fragment_dict[frag0.label] = frag0
else:
raise Exception('Fragment with duplicated labels found: {0}'.format(frag0.label))
In [ ]:
# put aromatic isomer in front of species.molecule
# 'cause that's the isomer we want to react
for frag in fragments:
species = frag.species_repr
species.generateResonanceIsomers()
for mol in species.molecule:
if mol.isAromatic():
species.molecule = [mol]
break
In [ ]:
# load fragment mech in text
fragment_mech_filepath = os.path.join(working_dir, 'frag_mech.txt')
reaction_string_dict = read_frag_mech(fragment_mech_filepath)
# generate reactions
fragment_rxns = []
for family_label in reaction_string_dict:
# parse reaction strings
print "Processing {0}...".format(family_label)
for reaction_string in tqdm(reaction_string_dict[family_label]):
reactant_strings, product_strings = parse_reaction_string(reaction_string)
reactants = [fragment_dict[reactant_string].species_repr for reactant_string in reactant_strings]
products = [fragment_dict[product_string].species_repr.molecule[0] for product_string in product_strings]
for idx, reactant in enumerate(reactants):
for mol in reactant.molecule:
mol.props['label'] = reactant_strings[idx]
for idx, product in enumerate(products):
product.props['label'] = product_strings[idx]
# this script requires reactants to be a list of Species objects
# products to be a list of Molecule objects.
# returned rxns have reactants and products in Species type
new_rxns = database.kinetics.generate_reactions_from_families(reactants=reactants,
products=products,
only_families=[family_label],
resonance=True)
if len(new_rxns) != 1:
print reaction_string + family_label
raise Exception('Non-unique reaction is generated with {0}'.format(reaction_string))
# create fragment reactions
rxn = new_rxns[0]
fragrxn = afm.reaction.FragmentReaction(index=-1,
reversible=True,
family=rxn.family,
reaction_repr=rxn)
fragment_rxns.append(fragrxn)
In [ ]:
from rmgpy.data.rmg import getDB
from rmgpy.thermo.thermoengine import processThermoData
from rmgpy.thermo import NASA
import rmgpy.constants as constants
import math
In [ ]:
thermodb = getDB('thermo')
# calculate thermo for each species
for fragrxn in tqdm(fragment_rxns):
rxn0 = fragrxn.reaction_repr
for spe in rxn0.reactants + rxn0.products:
thermo0 = thermodb.getThermoData(spe)
if spe.label in ['RCCCCR', 'LCCCCR', 'LCCCCL']:
thermo0.S298.value_si += constants.R * math.log(2)
spe.thermo = processThermoData(spe, thermo0, NASA)
family = getFamilyLibraryObject(rxn0.family)
# Get the kinetics for the reaction
kinetics, source, entry, isForward = family.getKinetics(rxn0, \
templateLabels=rxn0.template, degeneracy=rxn0.degeneracy, \
estimator='rate rules', returnAllKinetics=False)
rxn0.kinetics = kinetics
if not isForward:
rxn0.reactants, rxn0.products = rxn0.products, rxn0.reactants
rxn0.pairs = [(p,r) for r,p in rxn0.pairs]
# convert KineticsData to Arrhenius forms
if isinstance(rxn0.kinetics, KineticsData):
rxn0.kinetics = rxn0.kinetics.toArrhenius()
# correct barrier heights of estimated kinetics
if isinstance(rxn0,TemplateReaction) or isinstance(rxn0,DepositoryReaction): # i.e. not LibraryReaction
rxn0.fixBarrierHeight() # also converts ArrheniusEP to Arrhenius.
fragrxts = [fragment_dict[rxt.label] for rxt in rxn0.reactants]
fragprds = [fragment_dict[prd.label] for prd in rxn0.products]
fragpairs = [(fragment_dict[p0.label],fragment_dict[p1.label]) for p0,p1 in rxn0.pairs]
fragrxn.reactants=fragrxts
fragrxn.products=fragprds
fragrxn.pairs=fragpairs
fragrxn.kinetics=rxn0.kinetics
In [ ]:
for frag in fragments:
spe = frag.species_repr
thermo0 = thermodb.getThermoData(spe)
if spe.label in ['RCCCCR', 'LCCCCR', 'LCCCCL']:
thermo0.S298.value_si += constants.R * math.log(2)
spe.thermo = processThermoData(spe, thermo0, NASA)
if spe.label in ['RCCCCR', 'LCCCCR', 'LCCCCL']:
print spe.label
print spe.getFreeEnergy(670)/4184
In [ ]:
for fragrxn in tqdm(fragment_rxns):
rxn0 = fragrxn.reaction_repr
if rxn0.family in ['R_Recombination', 'H_Abstraction', 'R_Addition_MultipleBond']:
for spe in rxn0.reactants + rxn0.products:
if spe.label in ['RCC*CCR', 'LCC*CCR', 'LCC*CCL']:
rxn0.kinetics.changeRate(4)
fragrxn.kinetics=rxn0.kinetics
In [ ]:
species_list = []
for frag in fragments:
species = frag.species_repr
species_list.append(species)
len(fragments)
In [ ]:
reaction_list = []
for fragrxn in fragment_rxns:
rxn = fragrxn.reaction_repr
reaction_list.append(rxn)
len(reaction_list)
In [ ]:
# dump chemkin files
chemkin_path = os.path.join(working_dir, 'chem_annotated.inp')
dictionaryPath = os.path.join(working_dir, 'species_dictionary.txt')
saveChemkinFile(chemkin_path, species_list, reaction_list)
saveSpeciesDictionary(dictionaryPath, species_list)
In [ ]:
def update_atom_count(tokens, parts, R_count):
# remove R_count*2 C and R_count*5 H
string = ''
if R_count == 0:
return 'G'.join(parts)
else:
H_count = int(tokens[2].split('C')[0])
H_count_update = H_count - 5*R_count
C_count = int(tokens[3])
C_count_update = C_count - 2*R_count
tokens = tokens[:2] + [str(H_count_update)+'C'] + [C_count_update]
# Line 1
string += '{0:<16} '.format(tokens[0])
string += '{0!s:<2}{1:>3d}'.format('H', H_count_update)
string += '{0!s:<2}{1:>3d}'.format('C', C_count_update)
string += ' ' * (4 - 2)
string += 'G' + parts[1]
return string
In [ ]:
corrected_chemkin_path = os.path.join(working_dir, 'chem_annotated.inp')
In [ ]:
output_string = ''
with open(chemkin_path) as f_in:
readThermo = False
for line in f_in:
if line.startswith('THERM ALL'):
readThermo = True
if not readThermo:
output_string += line
continue
if line.startswith('!'):
output_string += line
continue
if 'G' in line and '1' in line:
parts = [part for part in line.split('G')]
tokens = [token.strip() for token in parts[0].split()]
species_label = tokens[0]
R_count = species_label.count('R')
L_count = species_label.count('L')
updated_line = update_atom_count(tokens, parts, R_count+L_count)
output_string += updated_line
else:
output_string += line
with open(corrected_chemkin_path, 'w') as f_out:
f_out.write(output_string)
In [ ]: